package pb.wang.streaming.kafka

import java.util.{Date, HashMap}

import org.apache.kafka.clients.producer._

/**
  * Created by admin on 2016/3/31.
  */

object KafkaClkMockProducer {

  object ProducerCallback extends Callback {
    override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
      System.out.println("send to " + metadata.offset());
    }
  }

  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: KafkaImpMockProducer <metadataBrokerList> <topic> ")
      System.exit(1)
    }

    val Array(brokers, topic) = args

    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    // Send some messages
    while (true) {

      val timestamp = new Date().getTime / 1000
      val message1 = new ProducerRecord[String, String](topic, String.valueOf(timestamp), String.valueOf(timestamp))
      val message2 = new ProducerRecord[String, String](topic, String.valueOf(timestamp - 1), String.valueOf(timestamp))
      val message3 = new ProducerRecord[String, String](topic, String.valueOf(timestamp - 2), String.valueOf(timestamp))

      producer.send(message1, ProducerCallback)
      producer.send(message2, ProducerCallback)
      producer.send(message3, ProducerCallback)
      Thread.sleep(3000)
    }
  }
}
